
/** 
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
package com.tompai.neo4j;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.TransactionWork;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.Values;

import com.tompai.neo4j.batis.MybatisUtil;
import com.tompai.neo4j.dao.TwitterRelapMapper;
import com.tompai.neo4j.log.BaseLoger;
import com.tompai.neo4j.model.TwitterRelap;

/**
 * @desc: neo4j-tools
 * @name: TwitterBoltToNeo4j.java
 * @author: tompai
 * @createTime: 2018年12月31日 下午4:14:46
 * @history:
 * @version: v1.0
 */

public class TwitterBoltToNeo4j extends BaseLoger implements AutoCloseable {

	private Driver driver;

	public TwitterBoltToNeo4j(String uri, String username, String password) {
		Config config = Config.builder().withMaxConnectionLifetime(30, TimeUnit.MINUTES).withMaxConnectionPoolSize(50)
				.withConnectionAcquisitionTimeout(2, TimeUnit.MINUTES).withConnectionTimeout(15, TimeUnit.SECONDS)
				.withMaxTransactionRetryTime(15, TimeUnit.SECONDS).build();
		driver = GraphDatabase.driver(uri, AuthTokens.basic(username, password), config);
	}

	/**
	 * @author: tompai
	 * @createTime: 2018年12月31日 下午4:14:46
	 * @history:
	 * @param args void
	 */

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		TwitterRelapMapper twitterRelapMapper = MybatisUtil.getSession().getMapper(TwitterRelapMapper.class);
		String uri = "bolt://192.168.2.8:7687";
		String username = "neo4j";
		String password = "neo4jneo4j";
		logger.info("Neo4j server start.");
		long start = System.currentTimeMillis(); // 当前的时间
		try (TwitterBoltToNeo4j bolter = new TwitterBoltToNeo4j(uri, username, password)) {
			// twitterBolt.printGreeting("Hello Neo4j.");
			TwitterRelap relap = new TwitterRelap();
			long index = 1L;
			relap = twitterRelapMapper.selectById(index);
			do {
				logger.info("Done index:{}", index);
				String namea = relap.getAccounta();
				String nameb = relap.getAccountb();
				String role = relap.getRole();
				Date date = relap.getInputTime();
				String time = Date2String(date);
				logger.info("begain:{}-{}-{}", namea, role, nameb);
				if (!bolter.findPerson(namea)) {
					bolter.addPerson(namea);
					if (!bolter.findPerson(nameb)) {
						bolter.addPerson(nameb);
					}
					bolter.addRelation(namea, nameb, role, time);
				} else {
					if (!bolter.findPerson(nameb)) {
						bolter.addPerson(nameb);
					}
					bolter.addRelation(namea, nameb, role, time);
				}
				index++;
				relap = twitterRelapMapper.selectById(index);
				if(index%500==0) {
					long end = System.currentTimeMillis(); // 当前的时间
					logger.info("{}:Time Use: {} seconds", index,(end - start) / 1000.0);
				}
			} while (index<1000000);//relap!=null
		} catch (Exception e) {
			// TODO: handle exception
			logger.error(e.getMessage());
		}
		long end = System.currentTimeMillis(); // 当前的时间
		logger.info("Time Use: {} seconds", (end - start) / 1000.0);
	}

	public void addRelation(final String nameA, final String nameB, final String role, final String time) {
		try (Session session = driver.session()) {
			session.writeTransaction(new TransactionWork<Void>() {
				@Override
				public Void execute(Transaction tx) {
					if (role.equals("following")) {
						createRelationFoing(tx, nameA, nameB, role, time);
					} else {
						createRelationFoers(tx, nameB, nameA, role, time);
					}
					return null;
				}
			});
		} catch (Exception e) {
			// TODO: handle exception
			logger.error("addRelation" + e.getMessage());
		}
	}

	private void createRelationFoers(Transaction tx, String nameA, String nameB, String role, String time) {
		String statementTemplate = "MATCH (a:Person {name:$nameA}) MATCH (b:Person {name:$nameB}) MERGE (a)-[r:FOLLOWERS{role:$role,time:$time}]->(b)";
		Value parameters = Values.parameters("nameA", nameA, "nameB", nameB, "role", role, "time", time);
		tx.run(statementTemplate, parameters);
		logger.info("add relation:{}-{}-{}", nameA, role, nameB);
	}

	private void createRelationFoing(Transaction tx, String nameA, String nameB, String role, String time) {
		String statementTemplate = "MATCH (a:Person {name:$nameA}) MATCH (b:Person {name:$nameB}) MERGE (a)-[r:FOLLOWING{role:$role,time:$time}]->(b)";
		Value parameters = Values.parameters("nameA", nameA, "nameB", nameB, "role", role, "time", time);
		tx.run(statementTemplate, parameters);
		logger.info("add relation:{}-{}-{}", nameA, role, nameB);
	}

	public boolean findPerson(final String name) {
		try (Session session = driver.session()) {
			return session.writeTransaction(new TransactionWork<Boolean>() {
				@Override
				public Boolean execute(Transaction tx) {
					return findNode(tx, name);
				}
			});
		} catch (Exception e) {
			// TODO: handle exception
			logger.error("findPerson:" + e.getMessage());
			return false;
		}
	}

	private boolean findNode(Transaction tx, String name) {
		String statementTemplate = "MATCH (a:Person {name:$name}) RETURN a.name";
		Value parameters = Values.parameters("name", name);
		StatementResult result = tx.run(statementTemplate, parameters);
		int index=0;
		while (result.hasNext()) {
			Record record = result.next();
			logger.info("Find {}", record.get("a.name"));
			index++;
		}
		if (index>0) {
			return true;
		}
		return false;
	}

	public void addPerson(final String name) {
		try (Session session = driver.session()) {
			session.writeTransaction(new TransactionWork<Integer>() {
				@Override
				public Integer execute(Transaction tx) {
					return crteateNode(tx, name);
				}
			});
		} catch (Exception e) {
			// TODO: handle exception
			logger.error("addPerson:" + e.getMessage());
		}
	}

	//Use MERGE to create a unique node
	private int crteateNode(Transaction tx, String name) {
		String statementTemplate = "MERGE (a:Person {name: $name})";
		Value parameters = Values.parameters("name", name);
		tx.run(statementTemplate, parameters);
		logger.info("create node:{}", name);
		return 1;
	}

	public void printGreeting(final String message) {
		// TODO Auto-generated method stub
		try (Session session = driver.session()) {
			String greeting = session.writeTransaction(new TransactionWork<String>() {
				@Override
				public String execute(Transaction tx) {
					// TODO Auto-generated method stub
					String statementTemplate = "CREATE (a:Greeting) SET a.message=$message RETURN a.message +', from node '+ id(a)";
					Value parameters = Values.parameters("message", message);
					StatementResult result = tx.run(statementTemplate, parameters);
					return result.single().get(0).asString();
				}
			});
			logger.info("greeting:{}", greeting);
		} catch (Exception e) {
			// TODO: handle exception
			logger.error(e.getMessage());
		}
	}

	private static String Date2String(Date date) {
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		String time = sdf.format(date);
		return time;
	}

	@Override
	public void close() throws Exception {
		// TODO Auto-generated method stub
		driver.closeAsync();
	}

}
